KAFKA-20169: Add ducktape test code for Streams Rebalance Protocol.#22561
KAFKA-20169: Add ducktape test code for Streams Rebalance Protocol.#22561chickenchickenlove wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds end-to-end ducktape coverage for Kafka Streams static membership when using the Streams rebalance protocol (group.protocol=streams), including a new Streams test client/service that persists the Streams processId across restarts and assertions that surviving members avoid reconciliation during a temporary bounce.
Changes:
- Introduces a new ducktape system test that repeatedly bounces static Streams members and validates persisted
processIdreuse + no survivor reconciliation. - Adds a new Streams test client/service pair to persist state (and therefore the Streams
processId) on disk across restarts. - Updates Streams/client logic and unit tests to allow static membership with the streams protocol and to handle
UNRELEASED_INSTANCE_IDas a fatal heartbeat error.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/kafkatest/tests/streams/streams_static_membership_streams_protocol_test.py | New ducktape test validating static membership stability under the Streams group protocol across rolling bounces. |
| tests/kafkatest/services/streams.py | Adds a ducktape service wrapper for the new persistent-process-id Streams test client. |
| streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberPersistentProcessIdTestClient.java | New Streams system-test client that uses a persistent state store to ensure processId is reused after restart. |
| streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | Updates config tests to assert group.instance.id is allowed when group.protocol=streams. |
| streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | Removes the config-time rejection of static membership when the streams protocol is enabled. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java | Adds unit coverage for static-member close semantics and expected leave epochs. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java | Extends tests for UNRELEASED_INSTANCE_ID handling and validates poll-on-close request fields for static members. |
| clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java | Treats UNRELEASED_INSTANCE_ID as a fatal heartbeat error. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| checkpoints = { | ||
| processor: { | ||
| "log": self._line_count(processor, processor.LOG_FILE), | ||
| "stdout": self._line_count(processor, processor.STDOUT_FILE), | ||
| } | ||
| for processor in processors | ||
| } | ||
|
|
||
| verify_stopped(bounced, self.stopped_message) | ||
| verify_running(bounced, self.running_message) | ||
|
|
||
| self.assert_same_process_id_reused(bounced, checkpoints[bounced]["log"], baseline_process_ids[bounced]) | ||
|
|
||
| for survivor in processors: | ||
| if survivor is bounced: | ||
| continue | ||
| self.assert_survivor_was_unaffected(survivor, checkpoints[survivor]["log"]) |
| if (args.length < 1) { | ||
| System.err.println(TEST_NAME + " requires one argument (properties-file) but none provided: "); | ||
| } | ||
|
|
||
| System.out.println("StreamsTest instance started"); | ||
|
|
||
| final String propFileName = args[0]; | ||
| final Properties streamsProperties = Utils.loadProps(propFileName); |
| @@ -0,0 +1,192 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one or more | |||
There was a problem hiding this comment.
This is largely a copy of the existing static membership test infra (StreamsStaticMembershipTest, StaticMemberTestService, StaticMemberTestClient). The only real new bits are the persistent store so the processId survives restart and group.protocol=streams. Could we instead adapt the existing test/service rather than forking new copies - e.g. parameterize StaticMemberTestService with group_protocol and add the persistent store to the existing client? The new assertion (survivor non-reconciliation / processId reuse) can be a new test method on the existing class.
There was a problem hiding this comment.
@lucasbru
Thanks for the review!
You are right. I will fix this.
…eams rebalance protocol at Client Side.
f2c7c52 to
0d1a0d9
Compare
|
@lucasbru ====================================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.14.0
session_id: 2026-06-21--008
run time: 3 minutes 38.640 seconds
tests run: 2
passed: 2
flaky: 0
failed: 0
ignored: 0
====================================================================================================
test_id: kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_rolling_bounces_will_not_trigger_rebalance_under_static_membership.metadata_quorum=ISOLATED_KRAFT
status: PASS
run time: 1 minute 37.775 seconds
----------------------------------------------------------------------------------------------------
test_id: kafkatest.tests.streams.streams_static_membership_test.StreamsStaticMembershipTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliation.metadata_quorum=ISOLATED_KRAFT
status: PASS
run time: 2 minutes 0.611 seconds
----------------------------------------------------------------------------------------------------When you get a chance, please take another look. 🙇♂️ |
Summary
This PR is based on the
KAFKA-20169-CLIENT-COREbranch.(#22559) It adds a new ducktape
system test to verify that Static Membership works correctly with the
Streams rebalance protocol.
The new test covers the following behavior:
processIdafterrestart.
another static member temporarily bounces and rejoins.
each static member to validate stable behavior across restarts.
Changes
StaticMemberPersistentProcessIdTestClient, a Streams testclient with a persistent state store so the Streams
processIdiswritten to disk and reused after restart.
StaticMemberPersistentProcessIdTestServiceforducktape.
StreamsStaticMembershipStreamsProtocolTest.test_temporary_static_rejoin_does_not_trigger_survivor_reconciliationTest Results
Reviewers: Lucas Brutschy lbrutschy@confluent.io